今天要學習量化交易的三個重要階段:回測、虛擬單、上線。這就像爸爸種新品種作物的流程一樣:先看歷史資料研究(回測),然後在小塊地試種(虛擬單),最後才大規模種植(上線)。這個循序漸進的過程確保我們不會因為貿然行動而遭受重大損失!
回測就像查看農場的歷史收成記錄:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
class BacktestFramework:
"""回測框架"""
def __init__(self, initial_capital=100000):
self.initial_capital = initial_capital
self.current_capital = initial_capital
self.positions = {}
self.trades = []
self.equity_curve = []
def load_historical_data(self, symbol, start_date, end_date):
"""載入歷史數據"""
# 生成模擬歷史數據
dates = pd.date_range(start_date, end_date, freq='1D')
np.random.seed(42)
# 模擬價格走勢
returns = np.random.normal(0.001, 0.02, len(dates))
prices = [50000] # 起始價格
for ret in returns[1:]:
prices.append(prices[-1] * (1 + ret))
data = pd.DataFrame({
'timestamp': dates,
'open': prices,
'high': [p * (1 + abs(np.random.normal(0, 0.005))) for p in prices],
'low': [p * (1 - abs(np.random.normal(0, 0.005))) for p in prices],
'close': prices,
'volume': np.random.uniform(1000, 5000, len(dates))
})
return data.set_index('timestamp')
def calculate_indicators(self, data):
"""計算技術指標"""
# 移動平均線
data['sma_20'] = data['close'].rolling(20).mean()
data['sma_50'] = data['close'].rolling(50).mean()
# RSI
delta = data['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(14).mean()
rs = gain / loss
data['rsi'] = 100 - (100 / (1 + rs))
# 布林帶
data['bb_middle'] = data['close'].rolling(20).mean()
bb_std = data['close'].rolling(20).std()
data['bb_upper'] = data['bb_middle'] + (bb_std * 2)
data['bb_lower'] = data['bb_middle'] - (bb_std * 2)
return data
def generate_signals(self, data, strategy_name='ma_crossover'):
"""生成交易信號"""
if strategy_name == 'ma_crossover':
# 移動平均交叉策略
data['signal'] = 0
data['signal'] = np.where(
data['sma_20'] > data['sma_50'], 1, 0
)
data['position'] = data['signal'].diff()
elif strategy_name == 'rsi_reversal':
# RSI反轉策略
data['signal'] = 0
data['signal'] = np.where(data['rsi'] < 30, 1,
np.where(data['rsi'] > 70, -1, 0))
data['position'] = data['signal'].diff()
return data
def execute_backtest(self, data, commission=0.001):
"""執行回測"""
for timestamp, row in data.iterrows():
if pd.isna(row['position']) or row['position'] == 0:
continue
# 執行交易
if row['position'] == 1: # 買入信號
self._execute_buy(timestamp, row['close'], commission)
elif row['position'] == -1: # 賣出信號
self._execute_sell(timestamp, row['close'], commission)
# 記錄權益曲線
portfolio_value = self._calculate_portfolio_value(row['close'])
self.equity_curve.append({
'timestamp': timestamp,
'portfolio_value': portfolio_value,
'price': row['close']
})
def _execute_buy(self, timestamp, price, commission):
"""執行買入"""
if 'BTC' not in self.positions:
# 使用80%資金買入
trade_amount = self.current_capital * 0.8
quantity = trade_amount / price
cost = quantity * price * (1 + commission)
if cost <= self.current_capital:
self.positions['BTC'] = quantity
self.current_capital -= cost
self.trades.append({
'timestamp': timestamp,
'type': 'buy',
'quantity': quantity,
'price': price,
'cost': cost,
'commission': cost * commission
})
def _execute_sell(self, timestamp, price, commission):
"""執行賣出"""
if 'BTC' in self.positions:
quantity = self.positions['BTC']
revenue = quantity * price * (1 - commission)
self.current_capital += revenue
del self.positions['BTC']
self.trades.append({
'timestamp': timestamp,
'type': 'sell',
'quantity': quantity,
'price': price,
'revenue': revenue,
'commission': revenue * commission
})
def _calculate_portfolio_value(self, current_price):
"""計算投資組合價值"""
portfolio_value = self.current_capital
if 'BTC' in self.positions:
portfolio_value += self.positions['BTC'] * current_price
return portfolio_value
def generate_performance_report(self):
"""生成績效報告"""
if not self.equity_curve:
return {}
equity_df = pd.DataFrame(self.equity_curve)
equity_df['returns'] = equity_df['portfolio_value'].pct_change()
# 計算績效指標
total_return = (equity_df['portfolio_value'].iloc[-1] / self.initial_capital) - 1
annual_return = (1 + total_return) ** (365 / len(equity_df)) - 1
volatility = equity_df['returns'].std() * np.sqrt(365)
sharpe_ratio = annual_return / volatility if volatility > 0 else 0
# 最大回撤
equity_df['peak'] = equity_df['portfolio_value'].expanding().max()
equity_df['drawdown'] = (equity_df['portfolio_value'] - equity_df['peak']) / equity_df['peak']
max_drawdown = equity_df['drawdown'].min()
# 交易統計
trades_df = pd.DataFrame(self.trades)
if len(trades_df) > 0:
buy_trades = trades_df[trades_df['type'] == 'buy']
sell_trades = trades_df[trades_df['type'] == 'sell']
total_trades = min(len(buy_trades), len(sell_trades))
if total_trades > 0:
# 計算每筆交易的盈虧
trade_pnl = []
for i in range(total_trades):
buy_cost = buy_trades.iloc[i]['cost']
sell_revenue = sell_trades.iloc[i]['revenue']
pnl = sell_revenue - buy_cost
trade_pnl.append(pnl)
win_trades = [pnl for pnl in trade_pnl if pnl > 0]
win_rate = len(win_trades) / len(trade_pnl) if trade_pnl else 0
avg_win = np.mean(win_trades) if win_trades else 0
avg_loss = np.mean([pnl for pnl in trade_pnl if pnl < 0]) if trade_pnl else 0
else:
total_trades = 0
win_rate = 0
avg_win = 0
avg_loss = 0
else:
total_trades = 0
win_rate = 0
avg_win = 0
avg_loss = 0
return {
'total_return': total_return,
'annual_return': annual_return,
'volatility': volatility,
'sharpe_ratio': sharpe_ratio,
'max_drawdown': max_drawdown,
'total_trades': total_trades,
'win_rate': win_rate,
'avg_win': avg_win,
'avg_loss': avg_loss,
'profit_factor': abs(avg_win / avg_loss) if avg_loss != 0 else float('inf')
}
# 執行回測範例
def run_backtest_example():
"""執行回測範例"""
# 創建回測框架
backtest = BacktestFramework(initial_capital=100000)
# 載入歷史數據
data = backtest.load_historical_data('BTC', '2023-01-01', '2024-01-01')
# 計算技術指標
data = backtest.calculate_indicators(data)
# 生成交易信號
data = backtest.generate_signals(data, 'ma_crossover')
# 執行回測
backtest.execute_backtest(data)
# 生成績效報告
performance = backtest.generate_performance_report()
print("回測績效報告:")
print(f"總收益率: {performance['total_return']:.2%}")
print(f"年化收益率: {performance['annual_return']:.2%}")
print(f"年化波動率: {performance['volatility']:.2%}")
print(f"夏普比率: {performance['sharpe_ratio']:.2f}")
print(f"最大回撤: {performance['max_drawdown']:.2%}")
print(f"總交易次數: {performance['total_trades']}")
print(f"勝率: {performance['win_rate']:.2%}")
print(f"盈虧比: {performance['profit_factor']:.2f}")
return backtest, performance
# 執行回測
backtest_result, performance_metrics = run_backtest_example()
class PaperTradingEngine:
"""虛擬交易引擎"""
def __init__(self, initial_balance=100000):
self.balance = initial_balance
self.positions = {}
self.orders = {}
self.trade_history = []
self.is_running = False
async def start_paper_trading(self, strategy, market_data_feed):
"""開始虛擬交易"""
self.is_running = True
print("🚀 虛擬交易系統啟動")
while self.is_running:
try:
# 獲取實時市場數據
market_data = await market_data_feed.get_latest_data()
# 策略生成信號
signals = strategy.generate_signals(market_data)
# 執行虛擬交易
for signal in signals:
await self._execute_paper_order(signal, market_data)
# 更新投資組合
await self._update_portfolio(market_data)
# 記錄狀態
await self._log_status()
# 等待下個週期
await asyncio.sleep(1)
except Exception as e:
print(f"❌ 虛擬交易錯誤: {e}")
await asyncio.sleep(5)
async def _execute_paper_order(self, signal, market_data):
"""執行虛擬訂單"""
symbol = signal['symbol']
side = signal['side']
size = signal['size']
# 獲取當前價格
current_price = market_data[symbol]['price']
# 模擬滑價
slippage = self._calculate_slippage(symbol, size, side)
execution_price = current_price * (1 + slippage)
# 模擬手續費
commission = size * execution_price * 0.001
# 執行交易
if side == 'buy':
required_capital = size * execution_price + commission
if required_capital <= self.balance:
self.balance -= required_capital
self.positions[symbol] = self.positions.get(symbol, 0) + size
trade_record = {
'timestamp': datetime.now(),
'symbol': symbol,
'side': side,
'size': size,
'price': execution_price,
'commission': commission,
'slippage': slippage,
'status': 'filled'
}
self.trade_history.append(trade_record)
print(f"📈 虛擬買入: {size} {symbol} @ ${execution_price:.2f}")
else:
print(f"❌ 餘額不足,無法買入 {symbol}")
elif side == 'sell':
if symbol in self.positions and self.positions[symbol] >= size:
revenue = size * execution_price - commission
self.balance += revenue
self.positions[symbol] -= size
if self.positions[symbol] == 0:
del self.positions[symbol]
trade_record = {
'timestamp': datetime.now(),
'symbol': symbol,
'side': side,
'size': size,
'price': execution_price,
'commission': commission,
'slippage': slippage,
'status': 'filled'
}
self.trade_history.append(trade_record)
print(f"📉 虛擬賣出: {size} {symbol} @ ${execution_price:.2f}")
else:
print(f"❌ 持倉不足,無法賣出 {symbol}")
def _calculate_slippage(self, symbol, size, side):
"""計算滑價"""
# 簡化的滑價模型
base_slippage = 0.0001 # 0.01% 基礎滑價
size_impact = size * 0.000001 # 根據訂單大小調整
slippage = base_slippage + size_impact
# 買入時滑價為正,賣出時為負
return slippage if side == 'buy' else -slippage
async def _update_portfolio(self, market_data):
"""更新投資組合價值"""
total_value = self.balance
for symbol, quantity in self.positions.items():
if symbol in market_data:
current_price = market_data[symbol]['price']
position_value = quantity * current_price
total_value += position_value
return total_value
async def _log_status(self):
"""記錄狀態"""
# 每小時記錄一次詳細狀態
current_time = datetime.now()
if current_time.minute == 0:
portfolio_value = await self._update_portfolio({})
print(f"💰 投資組合價值: ${portfolio_value:.2f}")
print(f"💵 現金餘額: ${self.balance:.2f}")
if self.positions:
print(f"📊 持倉: {self.positions}")
class PaperTradingStrategy:
"""虛擬交易策略範例"""
def __init__(self):
self.last_signals = {}
def generate_signals(self, market_data):
"""生成交易信號"""
signals = []
for symbol, data in market_data.items():
# 簡單的動量策略
current_price = data['price']
# 假設有歷史價格資料
if 'price_history' in data:
price_history = data['price_history']
if len(price_history) >= 20:
sma_20 = sum(price_history[-20:]) / 20
# 價格突破20日均線
if current_price > sma_20 * 1.02: # 突破2%
signals.append({
'symbol': symbol,
'side': 'buy',
'size': 0.1,
'strategy': 'momentum_breakout'
})
elif current_price < sma_20 * 0.98: # 跌破2%
signals.append({
'symbol': symbol,
'side': 'sell',
'size': 0.1,
'strategy': 'momentum_breakdown'
})
return signals
# 虛擬交易範例
async def run_paper_trading_example():
"""運行虛擬交易範例"""
# 創建虛擬交易引擎
paper_engine = PaperTradingEngine(initial_balance=100000)
# 創建策略
strategy = PaperTradingStrategy()
# 模擬市場數據源
class MockMarketDataFeed:
async def get_latest_data(self):
# 模擬實時數據
return {
'BTCUSDT': {
'price': 50000 + np.random.normal(0, 500),
'volume': 1000,
'price_history': [49000 + i * 10 for i in range(25)]
}
}
market_feed = MockMarketDataFeed()
# 運行虛擬交易(這裡只運行幾個週期作為演示)
print("開始虛擬交易演示...")
for i in range(5):
market_data = await market_feed.get_latest_data()
signals = strategy.generate_signals(market_data)
for signal in signals:
await paper_engine._execute_paper_order(signal, market_data)
portfolio_value = await paper_engine._update_portfolio(market_data)
print(f"週期 {i+1}: 投資組合價值 ${portfolio_value:.2f}")
await asyncio.sleep(0.1) # 模擬時間間隔
# 如果在異步環境中運行
# await run_paper_trading_example()
class PaperVsLiveTradingAnalyzer:
"""虛擬交易與實盤交易差異分析"""
def __init__(self):
self.differences = {
'execution': {
'slippage': {
'paper': '模擬滑價,可能過於樂觀',
'live': '真實滑價,特別是大單或低流動性時'
},
'fill_rate': {
'paper': '100%成交,無部分成交',
'live': '可能部分成交或完全不成交'
},
'latency': {
'paper': '無網路延遲',
'live': '網路延遲影響執行價格'
}
},
'psychological': {
'pressure': {
'paper': '無真實資金壓力',
'live': '真實資金的心理壓力'
},
'discipline': {
'paper': '容易遵守規則',
'live': '情緒影響決策執行'
},
'risk_tolerance': {
'paper': '風險承受度較高',
'live': '真實損失時風險厭惡增加'
}
},
'market_impact': {
'order_book': {
'paper': '不影響市場',
'live': '大單可能影響價格'
},
'timing': {
'paper': '完美時機執行',
'live': '交易所維護、網路問題等'
}
}
}
def generate_transition_plan(self):
"""生成從虛擬到實盤的過渡計劃"""
plan = {
'phase_1': {
'duration': '2-4週',
'goal': '驗證策略邏輯',
'actions': [
'在虛擬環境中運行策略',
'記錄所有交易決策',
'分析策略表現',
'優化參數設定'
],
'success_criteria': [
'正收益率',
'合理的夏普比率',
'可控的最大回撤'
]
},
'phase_2': {
'duration': '1-2週',
'goal': '小額實盤驗證',
'actions': [
'使用最小金額實盤交易',
'比較虛擬與實盤結果',
'記錄心理變化',
'調整執行參數'
],
'success_criteria': [
'實盤表現接近虛擬交易',
'能夠控制情緒',
'嚴格執行策略'
]
},
'phase_3': {
'duration': '4-8週',
'goal': '逐步增加資金',
'actions': [
'每週評估表現',
'逐步增加交易規模',
'持續優化策略',
'建立風控機制'
],
'success_criteria': [
'穩定的正收益',
'控制回撤在可接受範圍',
'建立交易信心'
]
},
'phase_4': {
'duration': '持續',
'goal': '正式運營',
'actions': [
'使用目標資金規模',
'持續監控和優化',
'定期回顧和調整',
'風險管理機制完善'
],
'success_criteria': [
'達到預期收益目標',
'風險控制有效',
'策略持續有效'
]
}
}
return plan
analyzer = PaperVsLiveTradingAnalyzer()
transition_plan = analyzer.generate_transition_plan()
print("從虛擬交易到實盤交易的過渡計劃:")
for phase, details in transition_plan.items():
print(f"\n{phase.upper()}:")
print(f" 期間: {details['duration']}")
print(f" 目標: {details['goal']}")
print(f" 行動:")
for action in details['actions']:
print(f" • {action}")
print(f" 成功標準:")
for criteria in details['success_criteria']:
print(f" ✓ {criteria}")
class GoLiveChecklist:
"""上線檢查清單"""
def __init__(self):
self.checklist = {
'strategy_validation': [
'回測結果滿意(正收益、合理回撤)',
'虛擬交易表現良好(至少4週)',
'小額實盤測試成功',
'策略邏輯經過同行評議',
'參數設定經過優化'
],
'technical_infrastructure': [
'交易系統穩定運行',
'API連接穩定可靠',
'備用系統準備就緒',
'監控系統完善',
'日誌記錄完整'
],
'risk_management': [
'風險控制參數設定',
'止損機制完善',
'最大回撤限制',
'倉位大小控制',
'緊急停止機制'
],
'operational_readiness': [
'交易資金準備充足',
'團隊角色分工明確',
'應急預案制定完成',
'監控值班安排',
'定期評估機制'
],
'compliance_legal': [
'合規要求確認',
'稅務處理準備',
'監管報告機制',
'資金來源合法',
'風險披露完整'
]
}
def evaluate_readiness(self, completed_items):
"""評估上線準備度"""
total_items = sum(len(items) for items in self.checklist.values())
completed_count = len(completed_items)
readiness_score = completed_count / total_items
if readiness_score >= 0.9:
recommendation = "準備就緒,可以上線"
risk_level = "低"
elif readiness_score >= 0.8:
recommendation = "基本準備就緒,建議補充剩餘項目"
risk_level = "中低"
elif readiness_score >= 0.7:
recommendation = "需要完成更多準備工作"
risk_level = "中"
else:
recommendation = "準備不足,不建議上線"
risk_level = "高"
return {
'readiness_score': readiness_score,
'recommendation': recommendation,
'risk_level': risk_level,
'completed_items': completed_count,
'total_items': total_items
}
# 使用檢查清單
checklist = GoLiveChecklist()
# 模擬已完成的項目
completed_items = [
'回測結果滿意(正收益、合理回撤)',
'虛擬交易表現良好(至少4週)',
'交易系統穩定運行',
'API連接穩定可靠',
'風險控制參數設定',
'止損機制完善',
'交易資金準備充足'
]
evaluation = checklist.evaluate_readiness(completed_items)
print(f"上線準備度評估:")
print(f"完成度: {evaluation['readiness_score']:.1%}")
print(f"建議: {evaluation['recommendation']}")
print(f"風險等級: {evaluation['risk_level']}")
class LiveTradingMonitor:
"""實盤交易監控系統"""
def __init__(self, telegram_notifier=None):
self.telegram_notifier = telegram_notifier
self.alerts = []
self.performance_metrics = {}
async def monitor_trading_performance(self, trading_engine):
"""監控交易表現"""
while True:
try:
# 獲取當前表現
current_metrics = await self._calculate_metrics(trading_engine)
# 檢查異常情況
alerts = self._check_alerts(current_metrics)
# 發送警告
for alert in alerts:
await self._send_alert(alert)
# 記錄表現
self.performance_metrics = current_metrics
# 等待下次檢查
await asyncio.sleep(300) # 5分鐘檢查一次
except Exception as e:
await self._send_alert({
'type': 'system_error',
'message': f'監控系統錯誤: {e}',
'severity': 'high'
})
await asyncio.sleep(60)
async def _calculate_metrics(self, trading_engine):
"""計算關鍵指標"""
portfolio_value = await trading_engine.get_portfolio_value()
daily_pnl = await trading_engine.get_daily_pnl()
open_positions = await trading_engine.get_open_positions()
return {
'portfolio_value': portfolio_value,
'daily_pnl': daily_pnl,
'open_positions_count': len(open_positions),
'total_exposure': sum(pos['value'] for pos in open_positions),
'timestamp': datetime.now()
}
def _check_alerts(self, metrics):
"""檢查警告條件"""
alerts = []
# 檢查日損失
if metrics['daily_pnl'] < -5000: # 日損失超過5000
alerts.append({
'type': 'high_daily_loss',
'message': f'日損失過大: ${metrics["daily_pnl"]:.2f}',
'severity': 'high'
})
# 檢查持倉集中度
if metrics['open_positions_count'] > 10:
alerts.append({
'type': 'high_position_count',
'message': f'持倉數量過多: {metrics["open_positions_count"]}',
'severity': 'medium'
})
# 檢查總敞口
max_exposure = 50000 # 最大敞口5萬
if metrics['total_exposure'] > max_exposure:
alerts.append({
'type': 'high_exposure',
'message': f'總敞口過大: ${metrics["total_exposure"]:.2f}',
'severity': 'high'
})
return alerts
async def _send_alert(self, alert):
"""發送警告"""
self.alerts.append(alert)
if self.telegram_notifier:
message = f"🚨 {alert['severity'].upper()} ALERT\n"
message += f"類型: {alert['type']}\n"
message += f"訊息: {alert['message']}\n"
message += f"時間: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
await self.telegram_notifier.send_message(message)
print(f"⚠️ ALERT: {alert['message']}")
# 上線監控範例
async def setup_live_monitoring():
"""設置實盤監控"""
monitor = LiveTradingMonitor()
# 假設有交易引擎
class MockTradingEngine:
async def get_portfolio_value(self):
return 98500 # 模擬投資組合價值
async def get_daily_pnl(self):
return -1500 # 模擬日損益
async def get_open_positions(self):
return [
{'symbol': 'BTCUSDT', 'value': 10000},
{'symbol': 'ETHUSDT', 'value': 5000}
]
trading_engine = MockTradingEngine()
# 運行監控(演示版本)
metrics = await monitor._calculate_metrics(trading_engine)
alerts = monitor._check_alerts(metrics)
print("監控指標:")
for key, value in metrics.items():
if key != 'timestamp':
print(f" {key}: {value}")
if alerts:
print("\n發現警告:")
for alert in alerts:
print(f" {alert['severity']}: {alert['message']}")
else:
print("\n✅ 系統狀態正常")
# 運行監控演示
# await setup_live_monitoring()
今天我們學習了量化交易從想法到實現的完整流程,就像農夫從試驗田到大規模種植的過程。重要概念包括:
回測階段:
虛擬交易階段:
正式上線階段:
關鍵成功因素:
記住爸爸說過的話:「種田要先在小塊地試驗,成功了再擴大規模」。量化交易也是如此,每個階段都有其重要作用,不可跳躍!
明天我們將深入學習期現套利策略的具體實作,這是量化交易中相對穩健的策略之一。
下一篇:Day 23 - 範例期現套利策略